{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "使用滑动标准差作为置信边带，找到并填充异常值为滑动均值"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import joblib\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import os\n",
    "import collections\n",
    "from itertools import zip_longest\n",
    "from joblib import Parallel, delayed\n",
    "# import matplotlib.pyplot as plt\n",
    "# %matplotlib inline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "def moving_average(data, window_size, window_type='rectang'):\n",
    "    '''\n",
    "    描述：\n",
    "        使用不同的窗口类型，获取滑动均值序列\n",
    "    参数:\n",
    "        window_type: triang or rectang\n",
    "    '''\n",
    "    if window_type == 'triang':\n",
    "        window = np.linspace(0, 2/(window_size+1), window_size+1)[1:]\n",
    "    elif window_type == 'rectang':\n",
    "        window = np.ones(int(window_size))/float(window_size)\n",
    "    else:\n",
    "        raise ValueError('window_type error!')\n",
    "        \n",
    "    return np.convolve(data, window, 'same')\n",
    "\n",
    "def df_moving_average(df, window_size, window_type='rectang', columns=None):\n",
    "    '''\n",
    "    描述：\n",
    "        使用滑动平均平滑序列\n",
    "    '''\n",
    "    if columns is None:\n",
    "        columns = ['Current_1', 'Current_2' ,'Current_3']\n",
    "    \n",
    "    for col in columns:\n",
    "        df[col] = moving_average(df[col], window_size, window_type)\n",
    "    return df\n",
    "\n",
    "def impute_anomalies_rolling_std(y, window_size, sigma=1.0):\n",
    "    '''\n",
    "    描述：\n",
    "        使用triang窗取均值，rectang窗提取方差，检测并使用均值填充异常点\n",
    "    '''\n",
    "    avg = moving_average(y, window_size,window_type= 'triang')\n",
    "    avg_list = avg.tolist()\n",
    "    residual = y - moving_average(y, window_size, 'rectang')\n",
    "    # Calculate the variation in the distribution of the residual\n",
    "    testing_std = pd.Series(residual).rolling(window=window_size, min_periods=1, center=False).std()\n",
    "    rolling_std = testing_std.replace(np.nan,testing_std.iloc[window_size-1]).values\n",
    "    \n",
    "    up_bound = avg + (sigma * rolling_std)\n",
    "    down_bound = avg - (sigma * rolling_std)\n",
    "    f = ((y>up_bound)|(y<down_bound)).values\n",
    "    y[f] = avg[f]\n",
    "    return y\n",
    "\n",
    "def df_impute_anomalies_rolling_std(df, window_size, sigma, columns=None):\n",
    "    if columns is None:\n",
    "        columns = ['Vibration_1', 'Vibration_2', 'Vibration_3']\n",
    "    for col in columns:\n",
    "        df[col] = impute_anomalies_rolling_std(df[col], window_size, sigma)\n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def optfunc_parallel(data_no, csv_nos, opt_func_list):\n",
    "    '''\n",
    "    描述：\n",
    "        并行处理数据\n",
    "    参数：\n",
    "        data_no：第几个plc\n",
    "        csv_nos：plc对应的sensor文件个数\n",
    "    '''\n",
    "    \n",
    "    input_dir = './sensors_clean/0%d/'%data_no\n",
    "    output_dir = './sensors_ad/0%d/'%data_no\n",
    "\n",
    "    if not os.path.exists('./sensors_ad/'):\n",
    "        os.mkdir('./sensors_ad')\n",
    "    if not os.path.exists('./sensors_ad/0%d'%data_no):\n",
    "        os.mkdir('./sensors_ad/0%d'%data_no)\n",
    "    \n",
    "    def basis_func(idx):\n",
    "        sensor = joblib.load(input_dir + '%d.lz4'%idx)\n",
    "#         sensor.columns = ['Vibration_1', 'Vibration_2', 'Vibration_3','Current_1', 'Current_2', 'Current_3', 'id', 'sort_col']\n",
    "        df_1 = opt_func_list[0](df = sensor, window_size = 15, window_type='rectang', columns = ['Current_1'])\n",
    "        df_2 = opt_func_list[1](df = df_1, window_size = 15, sigma=2.0, columns=['Vibration_1', 'Vibration_2', 'Vibration_3'])\n",
    "        joblib.dump(df_2, output_dir+'%d.lz4'%idx, compress='lz4')\n",
    "\n",
    "    Parallel(n_jobs=48,verbose=10)(delayed(basis_func)(i) for i in range(1,csv_nos+1))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:   38.3s remaining:  9.6min\n",
      "[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:   53.1s remaining:  4.4min\n",
      "[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:  1.4min remaining:  3.7min\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:  1.5min remaining:  2.5min\n",
      "[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:  1.5min remaining:  1.6min\n",
      "[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:  1.5min remaining:  1.1min\n",
      "[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  1.5min remaining:   41.9s\n",
      "[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  1.6min remaining:   24.5s\n",
      "[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  1.6min remaining:   10.9s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  1.6min remaining:    0.0s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  1.6min finished\n"
     ]
    }
   ],
   "source": [
    "optfunc_parallel(1, 48, [df_moving_average, df_impute_anomalies_rolling_std])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:   15.2s remaining:  3.8min\n",
      "[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:   17.0s remaining:  1.4min\n",
      "[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:   17.9s remaining:   48.1s\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:   22.6s remaining:   37.6s\n",
      "[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:   26.4s remaining:   28.6s\n",
      "[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:   55.4s remaining:   39.6s\n",
      "[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  1.1min remaining:   28.8s\n",
      "[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  1.1min remaining:   17.2s\n",
      "[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  1.1min remaining:    7.6s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  1.1min remaining:    0.0s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  1.1min finished\n"
     ]
    }
   ],
   "source": [
    "optfunc_parallel(2, 48, [df_moving_average, df_impute_anomalies_rolling_std])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   2 out of  37 | elapsed:   28.4s remaining:  8.3min\n",
      "[Parallel(n_jobs=48)]: Done   6 out of  37 | elapsed:   57.5s remaining:  5.0min\n",
      "[Parallel(n_jobs=48)]: Done  10 out of  37 | elapsed:  1.1min remaining:  3.0min\n",
      "[Parallel(n_jobs=48)]: Done  14 out of  37 | elapsed:  1.2min remaining:  1.9min\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  37 | elapsed:  1.2min remaining:  1.3min\n",
      "[Parallel(n_jobs=48)]: Done  22 out of  37 | elapsed:  1.2min remaining:   48.9s\n",
      "[Parallel(n_jobs=48)]: Done  26 out of  37 | elapsed:  1.2min remaining:   30.6s\n",
      "[Parallel(n_jobs=48)]: Done  30 out of  37 | elapsed:  1.2min remaining:   17.0s\n",
      "[Parallel(n_jobs=48)]: Done  34 out of  37 | elapsed:  1.2min remaining:    6.5s\n",
      "[Parallel(n_jobs=48)]: Done  37 out of  37 | elapsed:  1.2min finished\n"
     ]
    }
   ],
   "source": [
    "optfunc_parallel(3, 37, [df_moving_average, df_impute_anomalies_rolling_std])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
